/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;

import java.util.*;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;

/**
 * An optimizer that removes unnecessary temp stores (Stores generated by
 * the MRCompiler to bridge different jobs - even though a real store
 * is produced from the same data). The pattern looks like this:
 *
 *    ------------- Split ---------
 *    |                           |
 *  Store(InterStorage)         Store(StoreFunc)
 * 
 * Followed by a load of the tmp store in a dependent MapReduceOper.
 *
 * This optmizer removes the store, collapses the split if only one
 * branch remains and adjusts the loads to load from the real store.
 *
 * The situation is produced by something we do to the logical plan in
 * PigServer. There we change:
 *
 *    PreOp
 *     |
 *    Store
 *   
 *    Load
 *     |
 *    PostOp
 *
 * To:
 *
 *           PreOp
 *             |
 *            / \
 *           /   \
 *       PostOp  Store
 *
 * If there is a job boundary between pre and post we will end up in
 * this case.
 *
 */
class NoopStoreRemover extends MROpPlanVisitor {
    
    private Log log = LogFactory.getLog(getClass());

    private Map<String, FileSpec> replacementMap;
    private List<RemovableStore> removalQ;
    private List<POStore> storeQ;
    
    NoopStoreRemover(MROperPlan plan) {
        super(plan, new DependencyOrderWalker<MapReduceOper, MROperPlan>(plan));
        replacementMap = new HashMap<String, FileSpec>();
    }

    @Override
    public void visitMROp(MapReduceOper mr) throws VisitorException {
        removalQ = new LinkedList<RemovableStore>();
        storeQ = new LinkedList<POStore>();
        
        // This situation can happen in map and reduce (not combine)
        new PhysicalRemover(mr.mapPlan).visit();
        new PhysicalRemover(mr.reducePlan).visit();
        
        for (RemovableStore st: removalQ) {
            removeStore(st);
        }
        
        for (POStore st: storeQ) {
            // don't need the input filespec anymore; and we don't
            // want to serialize it in the job control compiler.
            st.setInputSpec(null);
        }

    }   

    private void removeStore(RemovableStore rem) {
        try {
            // Remove the store plan from the nested split plan.
            rem.split.removePlan(rem.storePlan);

            // Collapse split if only one nested plan remains.
            if (rem.split.getPlans().size() == 1) {
                PhysicalPlan plan = rem.split.getPlans().get(0);
                POStore store = (POStore)plan.getRoots().get(0);
                plan.remove(store);
                store.setInputs(rem.split.getInputs());
                rem.plan.replace(rem.split, store);
            } 
        } catch(PlanException pe) {
            log.info("failed to remove unnecessary store from plan: "+pe.getMessage());
        }
    }

    private static class RemovableStore {
        public PhysicalPlan storePlan;
        public PhysicalPlan plan;
        public POSplit split;
    }

    private class PhysicalRemover extends PhyPlanVisitor {
        PhysicalRemover(PhysicalPlan plan) {
            super(plan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(plan));
        }

        @Override
        public void visit() throws VisitorException {
            super.visit();
        }

        @Override
        public void visitLoad(POLoad load) {
            // As we go through update the load ops of the tmp stores
            // that we removed with the resulting other stores output.
            FileSpec spec = replacementMap.get(load.getLFile().getFileName());
            if (spec != null) {
                load.setLFile(spec);
            }
        }

        @Override
        public void visitStore(POStore store) {
            // remember these. We will remove the input spec once
            // we're done.
            storeQ.add(store);
        }
        
        @Override
        public void visitSplit(POSplit split) throws VisitorException {
            super.visitSplit(split);
            FileSpec lFile = null;
            FileSpec sFile = null;
            PhysicalPlan tmpStore = null;

            for (PhysicalPlan plan: split.getPlans()) {
                if (plan.size() == 1) {
                    PhysicalOperator op = plan.getRoots().get(0);
                    if (op instanceof POStore) {
                        POStore store = (POStore)op;

                        if (store.isTmpStore()) { 
                            // tmp store means introduced by the
                            // MRCompiler. User didn't ask for
                            // those. There can be at most one per
                            // split. (Though there can be nested
                            // splits.)
                            tmpStore = plan;
                            sFile = store.getSFile();
                        } else if (store.getInputSpec() != null) {
                            // We set the input spec for store
                            // operators that had a corresponding load
                            // but we eliminated it in the
                            // PigServer. There could be multiple of
                            // those, but they are all reversible, so
                            // any of them will do.
                            lFile = store.getInputSpec();
                        }
                    }
                }
            }

            if (tmpStore != null && lFile != null) {
                // schedule removal (happens tuesdays and
                // thursdays. don't park your car on the street on
                // those days..
                RemovableStore rem = new RemovableStore();
                rem.storePlan = tmpStore;
                rem.plan = mCurrentWalker.getPlan();
                rem.split = split;
                removalQ.add(rem);
                replacementMap.put(sFile.getFileName(), lFile);
            }
        }            
    }
}
